package org.databandtech.clickhouse;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.databandtech.clickhouse.entity.AggUserDayCondition;
import org.databandtech.clickhouse.entity.AggUserDayItem;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseDataSource;
import ru.yandex.clickhouse.ClickHouseStatement;
import ru.yandex.clickhouse.settings.ClickHouseProperties;
import ru.yandex.clickhouse.settings.ClickHouseQueryParam;

/*
 * 合并表写入聚合表，通过视图
 * CREATE View v_active_count as
 SELECT
    d,
    Source,
    count(Uid) as actives,
    9999 as column1
FROM SNM_merger
where Total_duration > 300
GROUP BY
    Source,d
ORDER BY
    actives desc 
limit 4;

SELECT * from v_active_count;

CREATE TABLE SNM_Agg_User_D (
  datekey String,
  Source String,
  pt String,
  active_count UInt32,
  active_sum UInt32,
  active_rate UInt32,
  total_play UInt32,
  play_mans UInt32,
  play_count UInt32
)
 ENGINE = SummingMergeTree() 
 PARTITION BY Source  
 ORDER BY (datekey) ;
 
 */

public class SinkToAgg {

	final static String DATE = "2021-09-17 "; // 需要sink的日期

	public static void main(String[] args) {
		// TODO Auto-generated method stub

		// 需要执行查询的视图
		Map<String, String> viewsSQL = new HashMap<String, String>();
		viewsSQL.put("v_active_count", "SELECT d,Source,actives,column1 from v_active_count;");
		viewsSQL.put("v_total_duration", "SELECT * from v_total_duration;");

		// 各视图输出的列
		Map<String, String[]> viewColumns = new HashMap<String, String[]>();
		viewColumns.put("v_active_count", new String[] { "actives", "column1" });
		viewColumns.put("v_total_duration", new String[] { "total_duration" });

		// Merge表 - Agg表对应字段配置
		Map<String, String> sinkToAggColumns = new HashMap<String, String>();
		sinkToAggColumns.put("actives", "active_count");
		sinkToAggColumns.put("column1", "active_sum");
		sinkToAggColumns.put("total_duration", "total_play");

		// 数据中间缓存
		List<AggUserDayItem> rowItems = new ArrayList<AggUserDayItem>();
		// 将多个merge视图查询的map拍平，形成agg的dataset
		Map<AggUserDayCondition, Map<String, Integer>> aggDataset = new HashMap<AggUserDayCondition, Map<String, Integer>>();

		String url = Config.URL;
		ClickHouseProperties properties = new ClickHouseProperties();
		properties.setClientName("Agent #1");
		properties.setSessionId("default-session-id");

		ClickHouseDataSource dataSource = new ClickHouseDataSource(url, properties);
		Map<ClickHouseQueryParam, String> additionalDBParams = new HashMap<ClickHouseQueryParam, String>();
		additionalDBParams.put(ClickHouseQueryParam.SESSION_ID, "new-session-id");

		ClickHouseConnection conn = null;
		try {
			conn = dataSource.getConnection();
		} catch (SQLException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

		for (Map.Entry<String, String> entry : viewsSQL.entrySet()) {
			String viewKey = entry.getKey();
			String sqlValue = entry.getValue();
			System.out.println(viewKey + ":" + sqlValue);
			// 执行查询
			try (ClickHouseStatement stmt = conn.createStatement();
					ResultSet rs = stmt.executeQuery(sqlValue, additionalDBParams)) {
				while (rs.next()) {
					AggUserDayCondition condition = new AggUserDayCondition(rs.getString("Source"), rs.getString("d"),
							"");
					AggUserDayItem item = new AggUserDayItem(condition);

					String[] columns = viewColumns.get(viewKey);
					Map<String, Integer> valueMap = new HashMap<String, Integer>();

					for (String column : columns) {
						valueMap.put(column, rs.getInt(column));
					}
					item.addMap(valueMap);
					rowItems.add(item);
				}

			} catch (SQLException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}

		// 根据rowItems向聚合表写数据，将来可考虑把rowItems的数据写入kafka，使数据插入异步化.在此仅说明用法
		System.out.println(rowItems);
		for (AggUserDayItem entry : rowItems) {
			if (!aggDataset.containsKey(entry.getCondition())) {
				Map<String , Integer> firstMap = new HashMap<String , Integer>();
				for (Map.Entry<String, Integer> m : entry.getValueMap().entrySet()) {

					String oldKey = m.getKey();
					String aggKey = oldKey.replace(oldKey, sinkToAggColumns.get(oldKey));
					Integer value = m.getValue();
					firstMap.put(aggKey, value);
				}
				aggDataset.put(entry.getCondition(), firstMap);
				System.out.println("第一次插值:"+aggDataset);
			}
			else {
				Map<String , Integer> newMap = new HashMap<String , Integer>();
				Map<String , Integer> oldMap = aggDataset.get(entry.getCondition());
				for (Map.Entry<String, Integer> old : oldMap.entrySet()) {
					String oldKey = old.getKey();
					Integer value = old.getValue();
					newMap.put(oldKey, value);
				}
				System.out.println("旧map:"+newMap);
				for (Map.Entry<String, Integer> newitem : entry.getValueMap().entrySet()) {
					String key = newitem.getKey().replace(newitem.getKey(), sinkToAggColumns.get(newitem.getKey()));;
					Integer value = newitem.getValue();
					newMap.put(key, value);
				}
				System.out.println("新map:"+newMap);
				aggDataset.put(entry.getCondition(), newMap);
			}
		}
		
		System.out.println("aggDataset:"+aggDataset);
		
		for (Map.Entry<AggUserDayCondition, Map<String, Integer>> item : aggDataset.entrySet()) {
			AggUserDayCondition Key = item.getKey();
			Map<String, Integer> Value = item.getValue();

			int active_count = Value.containsKey("active_count")? Value.get("active_count"):0;
			int active_sum = Value.containsKey("active_sum")? Value.get("active_sum"):0;
			int active_rate = Value.containsKey("active_rate")? Value.get("active_rate"):0;
			int total_play = Value.containsKey("total_play")? Value.get("total_play"):0;
			int play_mans = Value.containsKey("play_mans")? Value.get("play_mans"):0;
			int play_count = Value.containsKey("play_count")? Value.get("play_count"):0;
			
			try {
				ClickHouseStatement stmt = conn.createStatement();
				String insert_sql = "INSERT INTO SNM_Agg_User_D("
						+ "datekey,Source,pt,active_count,active_sum,active_rate,total_play,play_mans,play_count) "
						+ "VALUES('" + Key.getDatekey() + "','"+ Key.getSource() + "','" + Key.getPt() + "',"
						+ active_count+","+active_sum+","+active_rate+","+total_play+","+play_mans+","+play_count+")";
				System.out.println(insert_sql);
				// 单条执行非常慢，弃用
				// stmt.execute(insert_sql);
				 stmt.addBatch(insert_sql);
				 stmt.executeBatch();
			} catch (SQLException e1) {
				// TODO Auto-generated catch block
				e1.printStackTrace();
			}
		}

	}

}
